package cn.gupao.jobs;


import cn.gupao.pojo.DynamicKeyedBean;
import cn.gupao.pojo.LogBean;
import cn.gupao.pojo.MatchResult;
import cn.gupao.pojo.RulesBean;
import cn.gupao.udfs.*;
import cn.gupao.utils.FlinkUtils;
import cn.gupao.utils.StateDescriptorUtils;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

/**
 * 根据用户的实时事件匹配动态规则
 *【 添加新的功能，按照规则中指定的keyBy条件进行分组】
 *
 */
public class MarketRulesMatchV2 {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = FlinkUtils.env;

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("bigdata001")
                .port(3306)
                .databaseList("flink") // set captured database
                .tableList("flink.tb_marketing_rules") // set captured table
                .username("root")
                .password("123456")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .build();


        DataStreamSource<String> rulesStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(1);

        SingleOutputStreamOperator<RulesBean> rulesBeanStream = rulesStream.process(new RulesJsonToBeanFunction());

        //将规则数据广播出去
        BroadcastStream<RulesBean> broadcastStream = rulesBeanStream.broadcast(StateDescriptorUtils.rulesStateDescriptor);

        /////////////////////

        //从Kafka中读取用户行为数据
        DataStream<String> kafkaStream = FlinkUtils.createKafkaStream(args[0], SimpleStringSchema.class);

        //数据转换清洗
        SingleOutputStreamOperator<LogBean> beanStream = kafkaStream.process(new JsonToBeanFunction());

        SingleOutputStreamOperator<LogBean> logBeanWithWaterMark = beanStream.assignTimestampsAndWatermarks(WatermarkStrategy.<LogBean>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<LogBean>() {
            @Override
            public long extractTimestamp(LogBean element, long recordTimestamp) {
                return element.getTimeStamp();
            }
        }));

        //原来把keyBy条件写死了(先要根据用户的设备ID进行keyBy)
        //改进：根据规则中的条件，动态keyBy
        //KeyedStream<LogBean, String> keyedStream = logBeanWithWaterMark.keyBy(LogBean::getDeviceId);

        //将事件数据流，与广播的规则数据进行connect（获取规则的keyBy条件）
        SingleOutputStreamOperator<DynamicKeyedBean> dynamicKeyedBeanStream = logBeanWithWaterMark.connect(broadcastStream).process(new DynamicKeyProcessFunction());

        //按照实际的条件进行keyBy
        KeyedStream<DynamicKeyedBean, String> keyedStream = dynamicKeyedBeanStream.keyBy(DynamicKeyedBean::getKeyValue);

        //将keyBy后的数据（key使用keyedState），与广播的数据进行connect（使用广播状态）
        SingleOutputStreamOperator<MatchResult> matchResult = keyedStream.connect(broadcastStream).process(new RulesMatchFunctionV4());
        matchResult.print();
        env.execute();


    }

}
